package com.joysuccess.snmptrap.snmptrap;

import com.joysuccess.common.utils.HelpUtils;
import com.joysuccess.snmptrap.rules.Alarm;
import com.joysuccess.snmptrap.rules.AlarmField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.snmp4j.*;
import org.snmp4j.mp.MPv1;
import org.snmp4j.mp.MPv2c;
import org.snmp4j.mp.MPv3;
import org.snmp4j.security.SecurityModels;
import org.snmp4j.security.SecurityProtocols;
import org.snmp4j.security.USM;
import org.snmp4j.smi.*;
import org.snmp4j.transport.DefaultTcpTransportMapping;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.snmp4j.util.MultiThreadedMessageDispatcher;
import org.snmp4j.util.ThreadPool;
import org.springframework.kafka.core.KafkaTemplate;

import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

import static com.joysuccess.snmptrap.util.BeanRefUtil.setFieldValue;
import static com.joysuccess.snmptrap.util.SnmpUtil.dealOctetStr;

/**
 * @author joysuccess
 */
public class SnmpTrapMultiThreadReceiver implements CommandResponder {

    private final static Logger LOGGER = LoggerFactory.getLogger(SnmpTrapMultiThreadReceiver.class);

    private MultiThreadedMessageDispatcher dispatcher;
    private Snmp snmp = null;
    private Address listenAddress;
    private ThreadPool threadPool;
    private KafkaTemplate kafkaTemplate;
    /**告警主题*/
    private final static String ALARM_TOPCI="alarm-messages";
    private String snmpTrapIp;
    private String snmpTrapPort;

    public SnmpTrapMultiThreadReceiver() {

    }

    public SnmpTrapMultiThreadReceiver(KafkaTemplate kafkaTemplate,String snmpTrapIp,String snmpTrapPort) {
        this.kafkaTemplate = kafkaTemplate;
        this.snmpTrapIp = snmpTrapIp;
        this.snmpTrapPort = snmpTrapPort;
    }

    private void init() throws IOException {
        threadPool = ThreadPool.create("TrapPool", 2);
        dispatcher = new MultiThreadedMessageDispatcher(threadPool,
                new MessageDispatcherImpl());
        listenAddress = GenericAddress.parse(System.getProperty(
                "snmp4j.listenAddress", "udp:"+snmpTrapIp+"/"+snmpTrapPort));
        TransportMapping transport;
        if (listenAddress instanceof UdpAddress) {
            transport = new DefaultUdpTransportMapping(
                    (UdpAddress) listenAddress);
        } else {
            transport = new DefaultTcpTransportMapping(
                    (TcpAddress) listenAddress);
        }
        snmp = new Snmp(dispatcher, transport);
        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv1());
        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv2c());
        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv3());
        USM usm = new USM(SecurityProtocols.getInstance(), new OctetString(
                MPv3.createLocalEngineID()), 0);
        SecurityModels.getInstance().addSecurityModel(usm);
        snmp.listen();
        snmp.addCommandResponder(this);

    }

    public void trapReceiverRun() {
        try {
            init();
            snmp.addCommandResponder(this);
            LOGGER.info("SnmpTrap启动成功，启动端口为："+snmpTrapIp + ":" + snmpTrapPort + "等待Trap message -----");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }


    @Override
    @SuppressWarnings("unchecked")
    public void processPdu(CommandResponderEvent crEvent) {
        Address address = crEvent.getPeerAddress();

        if (crEvent == null || crEvent.getPDU() == null) {
            LOGGER.warn("ResponderEvent or PDU is null");
            return;
        }
        Vector<? extends VariableBinding> vbVect = crEvent.getPDU().getVariableBindings();
        //接收处理后的事件
        Alarm alarm = new Alarm();
        LOGGER.info("获取到了snmp trap信息准备开始解析++++++++++++++++++++++++snmp trap start+++++ ");
        for (VariableBinding vb : vbVect) {
            Map<String, String> map = new HashMap<>();

            String oid = vb.getOid().toString();
            int length = oid.length();
            if(length < 11){
                LOGGER.error(vb.getOid().toString() + "：这个oid长度太短了，在字典中，没有这么短类型的oid");
                continue;
            }
            String splitOid = oid.substring(length - 11,length);
            String fieldName = AlarmField.getFieldNameByContains(splitOid);
            if (HelpUtils.isNotEmpty(fieldName)) {
                String value = dealOctetStr(vb.getVariable().toString(), "GBK");
                map.put(fieldName, value);
                setFieldValue(alarm, map);
                LOGGER.info("fieldName:" + fieldName);
                LOGGER.info("OID:" + vb.getOid().toString());
                LOGGER.info("Value:" + value);
            }else {
                LOGGER.error(vb.getOid().toString() + "：这个oid类型，在Alarm枚举类型中不存在");
            }
        }
        LOGGER.info("获取到了snmp trap信息解析完毕++++++++++++++++++++++++snmp trap end+++++ ");
        LOGGER.info("Send snmp Data to Kafka===============");
        kafkaTemplate.send(ALARM_TOPCI,alarm.alarmToJsonString(alarm));
    }

    public static void main(String[] args) throws ParseException {
        SnmpTrapMultiThreadReceiver stmtr = new SnmpTrapMultiThreadReceiver();
        stmtr.trapReceiverRun();
    }
}
